//import java.text.SimpleDateFormat
//import org.apache.flink.api.java.tuple.Tuple
//import org.apache.flink.streaming.api.TimeCharacteristic
//import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
//import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
//import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//import org.apache.flink.streaming.api.scala._
//import org.apache.flink.streaming.api.watermark.Watermark
//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
//import org.apache.flink.streaming.api.windowing.time.Time
//import org.apache.flink.streaming.api.windowing.windows.TimeWindow
//import org.apache.flink.util.Collector
//
//
///**
//  * @description: ${本案例模拟的是：以事件时间为标准，窗口滚动时间为5秒}
//  * @author: Liu Jun Jun
//  * @create: 2020-06-28 18:31
//  **/
//object PeriodicWatermarks {
//  def main(args: Array[String]): Unit = {
//
//    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    //设置以事件时间为基准
//    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//    //并行度设置为1。关于并行度的案例会在后面测试
//    env.setParallelism(1)
//    //设置10s生成一次水位线
//    env.getConfig.setAutoWatermarkInterval(10000)
//
//    val dataDS = env.socketTextStream("Desktop", 3456)//nc -lk接收数据
//
//
//    val tsDS = dataDS.map(str => {
//      val strings = str.split(",")
//      (strings(0), strings(1).toLong, 1)
//    }).assignTimestampsAndWatermarks(
//
//      new AssignerWithPeriodicWatermarks[(String,Long,Int)]{
//        var maxTs :Long= 0
//        //得到水位线，周期性调用这个方法，得到水位线，我这里设置的也就是延迟5秒
//        override def getCurrentWatermark: Watermark = new Watermark(maxTs - 5000)
//        //负责抽取事件事件
//        override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = {
//          maxTs = maxTs.max(element._2 * 1000L)
//          element._2 * 1000L
//        }
//      }
//
//
//
//
//      /*new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(5)) {
//        override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000
//      }*/
//    )
//    val result = tsDS
//      .keyBy(0)
//      //窗口大小为5s的滚动窗口
//      .timeWindow(Time.seconds(5))
//      .apply {
//        (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => {
//          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//          //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }")
//          out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }")
//        }
//      }
//    tsDS.print("water")
//    result.print("windows:>>>")
//
//    env.execute()
//  }
//}